package com.huan.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.nio.charset.StandardCharsets;

/**
 * flink 程序打包，具体打包注意事项，见 readme.md 文件
 *
 * @author huan.fu
 * @date 2023/9/17 - 23:31
 */
public class FlinkPackageApplication {

    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、从文件中读取数据
        String filePath = "/Users/huan/code/IdeaProjects/me/spring-cloud-parent/flink/flink-build-package/src/main/resources/word.txt";
        environment.readTextFile(filePath, StandardCharsets.UTF_8.name())
                // 3、将读取到一行数据进行切割、转换
                .flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (line, collector) -> {
                    // 每一行以空格进行分隔
                    String[] words = line.split(" ");
                    for (String word : words) {
                        // 使用 collector 向下游发送数据
                        collector.collect(Tuple2.of(word, 1));
                    }
                })
                // 使用 returns 明确指定返回类型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 4、根据 word 进行分组 0表示的是 Tuple2对象中的第一个字段的值
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) tuple2 -> tuple2.f0)
                // 5、各分组内进行聚合 1表示的是 Tuple2对象中的第二个字段的值
                .sum(1)
                // 6、输出
                .print();
        environment.execute("DataStreamApiWordCount");
    }

}
